package mqttclient

import (
	"errors"
	"fmt"
	"time"

	"go-zero-admin/apps/modules/iot/cmd/link/message"
	"go-zero-admin/apps/modules/iot/cmd/link/network"
	"go-zero-admin/apps/modules/iot/cmd/link/network/constant"
	"go-zero-admin/pkg/tool"

	"github.com/duke-git/lancet/v2/convertor"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var _ IMqttClient = (*MqttClient)(nil)

type (
	IMqttClient interface {
		network.INetwork
		// Subscribe 订阅
		Subscribe(topics []string) error
		// SubscribeQos 订阅qos
		SubscribeQos(topics []string, qos byte) error
		// Publish 发布
		Publish(msg message.IMqttMessage) error
	}

	MqttClient struct {
		Id                  int64
		Client              mqtt.Client
		Loading             bool
		LoadSuccessListener []map[message.IMqttMessage]func(message.IMqttMessage) error
		Subscriber          map[string]string
		SubCallBackHandle   func(mqtt.Client, mqtt.Message)
		UnSubscriber        map[string]func() error
	}
)

func (m *MqttClient) GetId() int64 {
	return m.Id
}

func (m *MqttClient) GetType() *network.NetworkType {
	networkType := new(network.NetworkType)
	networkType = networkType.Of(constant.MqttClient)
	return networkType
}

func (m *MqttClient) Shutdown() {
	m.Loading = false
	if !m.IsAlive() {
		return
	}
	m.Client.Disconnect(250)
	m.Client = nil
}

func (m *MqttClient) IsAlive() bool {
	return m.Client != nil && m.Client.IsConnected()
}

func (m *MqttClient) IsAutoReload() bool {
	return true
}

func (m *MqttClient) GetTarget() any {
	return m
}

func (m *MqttClient) Subscribe(topics []string) error {
	return m.SubscribeQos(topics, constant.MqttClientQos0)
}

func (m *MqttClient) SubscribeQos(topics []string, qos byte) error {
	for _, v := range topics {
		topicQos := m.Subscriber[v]

		first := false
		if topicQos == "" {
			first = true
		}

		// 如果不是第一次订阅或者网络组建不是存活状态，则跳过
		if !first || !m.IsAlive() {
			continue
		}

		// 如果订阅失败则跳过
		if token := m.Client.Subscribe(v, qos, m.SubCallBackHandle); token.Wait() && token.Error() != nil {
			continue
		}

		// 处理数据
		m.Subscriber[v] = string(qos)

		// 取消订阅程序
		m.UnSubscriber[v] = func() error {
			if token := m.Client.Unsubscribe(v); token.Wait() && token.Error() != nil {
				return errors.New(fmt.Sprintf("MQTT取消订阅失败：ID：%d，主题：%s", m.GetId(), v))
			}
			return nil
		}
	}
	return nil
}

func (m *MqttClient) Publish(msg message.IMqttMessage) error {
	if m.Loading {
		m.LoadSuccessListener = append(m.LoadSuccessListener, map[message.IMqttMessage]func(message message.IMqttMessage) error{msg: m.doPublish})
		return nil
	}
	return m.doPublish(msg)
}

func (m *MqttClient) doPublish(message message.IMqttMessage) error {
	token := m.Client.Publish(message.GetTopic(), byte(message.GetQosLevel()), message.IsRetain(), message.GetPayload())
	// 处理返回结果 等待是否超时，如果等待3秒后超时自动返回false
	for !token.WaitTimeout(3 * time.Second) {
	}
	if err := token.Error(); err != nil {
		return errors.New(fmt.Sprintf("MQTT发布失败：ID：%d，发布消息：%+v", m.GetId(), message))
	}

	return nil
}

func (m *MqttClient) SetLoading(loading bool) {
	m.Loading = loading
	if loading {
		return
	}

	for _, v := range m.LoadSuccessListener {
		// 获取map参数值并执行方法
		for k1, v1 := range v {
			go v1(k1)
		}
	}
	m.LoadSuccessListener = nil
}

func (m *MqttClient) IsLoading() bool {
	return m.Loading
}

func (m *MqttClient) SetClient(client mqtt.Client) error {
	if m.Client != nil && m.Client != client {
		m.Client.Disconnect(250)
	}
	m.Client = client
	m.SubCallBackHandle = func(client mqtt.Client, msg mqtt.Message) {
		// 组装mqtt消息
		mqttMessage := message.Build().MessageIdBuild(int64(msg.MessageID())).TopicBuild(msg.Topic()).PayloadByteBuild(msg.Payload()).
			DupBuild(msg.Duplicate()).RetainBuild(msg.Retained()).QosLevelBuild(int64(msg.Qos()))
		// 进行压缩传输
		mqttMsgData := tool.MarshalToJsonWithGzip(mqttMessage)
		if len(m.Subscriber) == 0 {
			return
		}
		_ = fmt.Sprint(mqttMsgData)

		// 获取该client订阅了多少主题，就发送多少信息数据
		//topics := Subscriber[1]
		//for range topics {
		//	_ = fmt.Sprint(mqttMsgData)
		//	//myNats.NatsApi.PubAsyncJs(&myNats.NatsHandleJsProperties{Topic: constant.MqttMsgNatsQue + networkProperties.Id, Payload: mqttMsgData})
		//}
	}
	if m.Loading {
		m.LoadSuccessListener = append(m.LoadSuccessListener, map[message.IMqttMessage]func(message message.
			IMqttMessage) error{nil: m.reSubscribe})
	} else if m.IsAlive() {
		err := m.reSubscribe(nil)
		if err != nil {
			return err
		}
	}
	return nil
}

func (m *MqttClient) reSubscribe(_ message.IMqttMessage) error {
	for k, v := range m.Subscriber {
		qos, _ := convertor.ToInt(v)
		if token := m.Client.Subscribe(k, byte(qos), m.SubCallBackHandle); token.Wait() && token.Error() != nil {
			return errors.New(fmt.Sprintf("MQTT重新订阅失败：ID：%d，主题：%s", m.GetId(), v))
		}
	}
	return nil
}
